<?php

namespace App\Common\Libs;

use App\Consts\CacheKeyConst;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Validator;

/**
 * RabbitMQ Client
 * Class RmqClient
 * @package App\Common\Libs
 */
class RmqClient
{

    /**
     * @var RmqClient $instance 单例对象
     */
    private static $instance = null;

    /**
     * @var array 配置项
     */
    private $config = [];

    /**
     * @var \AMQPConnection $connection 队列连接对象
     */
    private $connection = null;

    /**
     * @var \AMQPChannel $channel 队列通道对象
     */
    private $channel = null;

    /**
     * @var \AMQPExchange $channel 队列交换机对象
     */
    private $exchange = null;

    /**
     * @var \AMQPQueue $channel 队列对象
     */
    private $queue = null;

    /**
     * 构造函数
     *
     * @param array $config
     * @throws \AMQPConnectionException
     */
    private function __construct($config = [])
    {
        $this->config = $config ?: self::getConfig();
        if (empty($this->connection)) {
            $this->connect($this->config);
        }
    }

    /**
     * 单例实例化入口
     */
    public static function getInstance()
    {
        if (!self::$instance instanceof self) {
            self::$instance = new self();
        }
        return self::$instance;
    }

    /**
     * 获取配置
     * @param string $channel
     * @return mixed
     */
    public static function getConfig($channel = 'default')
    {
        $config = config('rabbitmq');
        return $config[$channel];
    }

    /**
     * 建立链接
     * @param array $config
     * @throws \AMQPConnectionException
     * @throws \Exception
     */
    protected function connect($config = [])
    {
        $config = $config ?: $this->config;
        $this->connection = new \AMQPConnection($config);
        if (!$this->connection->connect()) {
            throw new \Exception("Cannot connect to the broker", 1002);
        }
        $this->channel =  new \AMQPChannel($this->connection);
    }

    /**
     * 创建交换机
     * @param $exchangeName
     * @param string $type
     * @throws \AMQPChannelException
     * @throws \AMQPConnectionException
     * @throws \AMQPExchangeException
     */
    protected function createExchange($exchangeName, $type = AMQP_EX_TYPE_DIRECT)
    {
        $this->exchange = new \AMQPExchange($this->channel);
        $this->exchange->setName($exchangeName);
        $this->exchange->setType($type);
        $this->exchange->setFlags(AMQP_DURABLE);
        $this->exchange->declareExchange();
    }

    /**
     * 创建队列，绑定交换机
     * @param $exchangeName
     * @param $queueName
     * @throws \AMQPChannelException
     * @throws \AMQPConnectionException
     * @throws \AMQPQueueException
     */
    protected function createQueue($exchangeName, $queueName)
    {
        $this->queue = new \AMQPQueue($this->channel);
        $this->queue->setName($queueName);
        $this->queue->setFlags(AMQP_DURABLE);
        $this->queue->declareQueue();
        $this->queue->bind($exchangeName, $queueName);
    }

    /**
     * 消息出队列
     * [!] 阻塞监听方式
     *
     * @param $exchangeName
     * @param $queueName
     * @param string $type
     * @param bool $autoAck
     * @return array $aRes 标准输出
     */
    public function consumer($exchangeName, $queueName, $type = 'direct', $autoAck = false)
    {
        $check = $this->checkParams(['exchange' => $exchangeName, 'queue' => $queueName, 'type' => $type]);
        if (isset($check['code'])) {
            return $check;
        }
        try {
            //连接 broker
            $this->connect();
            //创建一个交换机
            $this->createExchange($exchangeName, $type);
            //创建一个消息队列
            $this->createQueue($exchangeName, $queueName);
            $that = $this;
            //设置消息队列消费者回调方法，并进行阻塞
            $callback = function (\AMQPEnvelope $envelope, \AMQPQueue $queue) use ($that) {
                if (Cache::store('redis')->get(CacheKeyConst::BLOCKING_RABBIT_MQ_STATUS)) {
                    return service()->RabbitMQService->consumer($envelope, $queue);
                } else {
                    $that->channel->close();
                    $that->connection->disconnect();
                    Cache::store('redis')->forget(CacheKeyConst::BLOCKING_RABBIT_MQ_STATUS);
                    return false;
                }
            };
            $flags = $autoAck ? AMQP_AUTOACK : AMQP_NOPARAM;
            // 阻塞运行
            if (getCacheByRetry(CacheKeyConst::BLOCKING_RABBIT_MQ_STATUS)) {
                $this->queue->consume($callback, $flags);
            } else {
                $this->channel->close();
                $this->connection->disconnect();
                Cache::store('redis')->forget(CacheKeyConst::BLOCKING_RABBIT_MQ_STATUS);
                return self::error('[heartbeat] channel mismatch : stop by handles.', 9200);
            }
        } catch (\Exception $e) {
            Cache::store('redis')->forget(CacheKeyConst::BLOCKING_RABBIT_MQ_STATUS);
            return self::error($e->getMessage(), $e->getCode());
        }
        return self::success();
    }

    /**
     * 消息入队列
     *
     * @param $exchangeName
     * @param $routeKey
     * @param $body
     * @param string $type
     * @return array $aRes 标准输出
     */
    public function publisher($exchangeName, $routeKey, $body, $type = 'direct')
    {
        // 参数校验
        $check = $this->checkParams(['exchange' => $exchangeName, 'queue' => $routeKey, 'type' => $type, 'body' => $body]);
        if (isset($check['code'])) {
            return $check;
        }
        try {
            // 声明交换机
            $this->createExchange($exchangeName, $type);
            // 推送信息
            $this->exchange->publish(json_encode($body), $routeKey);
        } catch (\Exception $e) {
            return self::error($e->getMessage(), $e->getCode());
        }
        return self::success();
    }

    /**
     * 从队列读取消息
     *
     * @param $exchangeName
     * @param $queueName
     * @param bool $forceDelete
     * @param string $type
     * @return array $aRes 标准输出
     * $aRes['data'] => message object
     */
    public function get($exchangeName, $queueName, $forceDelete = false, $type = 'direct')
    {
        // 参数校验
        $check = $this->checkParams(['exchange' => $exchangeName, 'queue' => $queueName]);
        if (isset($check['code'])) {
            return $check;
        }
        try {
            // 声明交换机
            $this->createExchange($exchangeName, $type);
            // 声明队列
            $this->createQueue($exchangeName, $queueName);
            $flags = $forceDelete ? AMQP_AUTOACK : AMQP_NOPARAM;
            $objMessage = $this->queue->get($flags);
        } catch (\Exception $e) {
            return self::error($e->getMessage(), $e->getCode());
        }
        return self::success($objMessage);
    }

    /**
     * 回复响应消息
     *
     * @param $tag
     * @return array $aRes  标准输出
     */
    public function ack($tag)
    {
        try {
            $this->queue->ack($tag);
        } catch (\Exception $e) {
            return self::error($e->getMessage(), $e->getCode());
        }
        return self::success();
    }

    /**
     * 克隆函数
     *
     * 私有防克隆
     */
    private function __clone()
    {
        return null;
    }

    /**
     * 析构函数
     */
    public function __destruct()
    {
        $this->channel->close();
        $this->connection->disconnect();
        self::$instance = null;
    }

    /**
     * 参数校验
     * @param $params
     * @return array|bool
     */
    protected function checkParams($params)
    {
        $objValidator = Validator::make(
            [
                'exchange' => isset($params['exchange']) ? $params['exchange'] : '',
                'queue' => isset($params['queue']) ? $params['queue'] : '',
                'type' => isset($params['type']) ? $params['type'] : '',
            ],
            [
                'exchange' => 'required|string',
                'queue' => 'required|string',
                'type' => 'required|string',
            ]
        );
        if ($objValidator->fails()) {
            return self::error($objValidator->errors()->first(), 90010);
        }
        if (isset($params['body'])) {
            $objValidator = Validator::make(['body' => $params['body']], ['body' => 'required|array']);
            if ($objValidator->fails()) {
                return self::error($objValidator->errors()->first(), 90011);
            }
        }
        return true;
    }

    /**
     * @param array $data
     * @return array
     */
    public static function success($data = [])
    {
        return ["code" => '0', "data" => $data, "msg" => "ok"];
    }

    /**
     * @param string $message
     * @param string|int $code
     * @return array
     */
    public static function error($message = '', $code = '')
    {
        $code = $code ?: '90099';
        return ["code" => strval($code), "data" => [], "msg" => $message];
    }

}